package cn.smileyan.demos;


import cn.smileyan.demos.core.CpuCheckMapFunction;
import cn.smileyan.demos.core.TaskProcessingFunction;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.core.CountAndTimeTrigger;
import cn.smileyan.demos.io.CommonKafkaBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;

import java.util.Objects;


/**
 * flink 任务入口
 * @author smileyan
 */
@Slf4j
public class CpuCheckJob {
    /**
     * 参数解释：
     *  -bs broker 地址 localhost:9092
     *  -kcg kafka consumer group
     *  -it kafka 输入数据 topic test-input-topic
     *  -ot kafka 输出数据 topic test-output-topic
     *  -ct 可选，是否自动创建 topic，使用方法 添加  -ct 即可，无需指定其值
     *  -pt topic 可选，分区数 1
     *  -rf topic 可选，副本数 1
     *  -gs gapSeconds 可选，单个消息窗口间隔时间，默认 1000 ms
     *  example:
     *  -bs localhost:9092 -it test-input-topic -ot test-output-topic -pt 1 -rf 1 -ct
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool parameterTool = MultipleParameterTool.fromArgs(args);
        log.info("starting flink job and kafka topic is {}", parameterTool.get("it"));
        final CommonKafkaBuilder commonKafkaBuilder = new CommonKafkaBuilder(parameterTool);

        final KafkaSource<TaskInput> source = commonKafkaBuilder.buildSource(TaskInput.class);
        final KafkaSink<TaskOutput> kafkaSink = commonKafkaBuilder.buildSink(TaskOutput.class);
        final long gapSeconds = parameterTool.getLong("gs", 1000L);
        final DynamicEventTimeSessionWindows<TaskInput> dynamicWindow = DynamicEventTimeSessionWindows.withDynamicGap(
                (SessionWindowTimeGapExtractor<TaskInput>) element -> gapSeconds * element.getClusterSize());

        final DataStreamSource<TaskInput> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        SingleOutputStreamOperator<TaskClusterData> mergedTaskData = dataStreamSource
                .filter(Objects::nonNull)
                .keyBy(TaskInput::getTaskId)
                .window(dynamicWindow)
                .trigger(new CountAndTimeTrigger<>())
                .process(new TaskProcessingFunction())
                .name("taskProcessing");

        SingleOutputStreamOperator<TaskOutput> resultData = mergedTaskData
                .map(new CpuCheckMapFunction())
                .name("cpu usage check");

        resultData.sinkTo(kafkaSink);

        env.execute("Flink Kafka Example");
    }

}
